
#include <iostream>
#include <thread>
#include <chrono>
#include <cassert>
#include "riskcon_client.h"




int64_t get_current_timestamp_us()
{
    std::chrono::time_point<std::chrono::system_clock, std::chrono::microseconds> tp =
        std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now());
    return tp.time_since_epoch().count();
}


RiskConClient::RiskConClient(const std::string &ptcp_dir, const std::string &name)
    : TSClient(ptcp_dir, name), conn(GetConnection())
{
    srand(time(NULL));
}

void RiskConClient::Run(bool use_shm, const char *server_ipv4, uint16_t server_port)
{
    if (!Connect(use_shm, server_ipv4, server_port, 0))
        return;

    thread shm_thr([this]()
                   {         
                while(!conn.IsClosed()) 
                {                   
                    PollShm();
                    //PollTcp(get_current_timestamp_us());                    
                } });
    
    // we still need to poll tcp for heartbeats even if using shm
    while(!conn.IsClosed())
    {
      PollTcp(get_current_timestamp_us());
    }
    shm_thr.join();
    Stop();
}

bool RiskConClient::SendMsg(int ntype, char *pdata, int nlen)
{
    MsgHeader *header = conn.Alloc(nlen);
    if (!header)
        return false;
    header->msg_type = ntype;
    memcpy(header, pdata, nlen);
    conn.Push();
    return true;
}

void RiskConClient::OnSystemError(const char *error_msg, int sys_errno)
{
    cout << "System Error: " << error_msg << " syserrno: " << strerror(sys_errno) << endl;
}

void RiskConClient::OnLoginReject(const LoginRspMsg *login_rsp)
{
    cout << "Login Rejected: " << login_rsp->error_msg << endl;
}


int64_t RiskConClient::OnLoginSuccess(const LoginRspMsg *login_rsp)
{
    cout << "Login Success" << endl;
    return get_current_timestamp_us();
}


void RiskConClient::OnSeqNumberMismatch(uint32_t local_ack_seq,
                                        uint32_t local_seq_start,
                                        uint32_t local_seq_end,
                                        uint32_t remote_ack_seq,
                                        uint32_t remote_seq_start,
                                        uint32_t remote_seq_end)
{
    cout << "Seq number mismatch, name: " << conn.GetRemoteName() << " ptcp file: " << conn.GetPtcpFile()
         << " local_ack_seq: " << local_ack_seq << " local_seq_start: " << local_seq_start
         << " local_seq_end: " << local_seq_end << " remote_ack_seq: " << remote_ack_seq
         << " remote_seq_start: " << remote_seq_start << " remote_seq_end: " << remote_seq_end << endl;
}

// called by tcp thread
void RiskConClient::OnDisconnected(const char *reason, int sys_errno)
{
    cout << "Client disconnected reason: " << reason << " syserrno: " << strerror(sys_errno) << endl;
}

// called by APP thread
void RiskConClient::OnServerMsg(MsgHeader *header)
{
    // auto msg_type = header->msg_type;
    switch (header->msg_type)
    {
    case 1:
        //handleMsg((Msg1 *)(header + 1));
        break;
    case 2:
        //handleMsg((Msg2 *)(header + 1));
        break;
    case 3:
       // handleMsg((Msg3 *)(header + 1));
        break;
    case 4:
        //handleMsg((Msg4 *)(header + 1));
        break;
    default:
        assert(false);
    }
    conn.Pop();
}

